﻿using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MessageLib.Helper;
using MessageLib.Model;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace MessageLib
{

    public class Cache
    {
        public string TopicName { get; set; }
        public bool State { get; set; }
    }

    public class RabbitMqClient
    {
        readonly ConnectionFactory _factory = new ConnectionFactory
        {
            // HostName = "127.0.0.1",
            // HostName = "10.0.100.132",
            HostName = "192.168.2.111",
            UserName = "admin", //用户名
            Password = "123456", //密码
            Port = 5672,
            AutomaticRecoveryEnabled = true
        };
        private readonly ISerialize _serialize;
        public event Action<string, string> PublishMessageEvent;
        protected virtual void OnPublishMessageEvent(string obj, string m)
        {
            PublishMessageEvent?.Invoke(obj, m);
        }
        private IConnection _conn;
        public RabbitMqClient(ISerialize serialize)
        {
            _networkState = false;
            _serialize = serialize;
            _heartbeatTime = DateTime.Now;
            try
            {
                Task.Run(() =>
                {
                    while (true)
                    {
                        try
                        {
                            //判断 连接是否正常
                            if (!_networkState || _heartbeatTime.AddMinutes(3) < DateTime.Now)
                            {
                                _networkState = false;
                                foreach (var item in _cache)
                                {
                                    item.Value.State = false;
                                }
                                _conn = _factory.CreateConnection();
                                _networkState = true;
                            }

                            if (_networkState)
                            {
                                foreach (var item in _cache)
                                {
                                    if (!item.Value.State)
                                    {
                                        Subscribe(item.Value.TopicName, item.Value);
                                    }
                                }
                            }
                        }
                        catch (Exception ex)
                        {
                            string d = "";
                        }
                        Thread.Sleep(1000 * 30);
                    }

                });


                _conn = _factory.CreateConnection();
                _networkState = true;
            }
            catch (Exception ex)
            {
                string d = "";
            }
        }
        /// <summary>
        /// 缓存
        /// </summary>
        private readonly Dictionary<string, Cache> _cache = new Dictionary<string, Cache>();
        /// <summary>
        /// 网络状态
        /// </summary>
        private bool _networkState = false;
        /// <summary>
        /// 心跳时间
        /// </summary>
        private DateTime _heartbeatTime;
        public bool Push<T>(string topicName, T message, ISerialize serialize) where T : Message, new()
        {


            try
            {

                using (IModel channel = _conn.CreateModel())
                {
                    //在MQ上定义一个持久化队列，如果名称相同不会重复创建
                    channel.QueueDeclare(topicName, true, false, false, null);

                    // string message = string.Format("Message_{0}{1}", DateTime.Now, Console.ReadLine());

                    string messageStr = _serialize.SerializerStr(message);
                    byte[] buffer = Encoding.UTF8.GetBytes(messageStr);
                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;
                    channel.BasicPublish("", topicName, properties, buffer);
                    // LogEvent("生产", "消息发送成功：" + message); 

                }
            }
            catch (Exception)
            {
                return false;
            }


            return true;
        }

        private bool Subscribe(string topicName, Cache cache)
        {
            try
            {
                Task.Run(() =>
                {
                    if (_networkState)
                    {
                        try
                        {
                            using (IModel channel = _conn.CreateModel())
                            {

                                channel.QueueDeclare(topicName, true, false, false, null);//在MQ上定义一个持久化队列，如果名称相同不会重复创建 申请一个队列 

                                channel.BasicQos(0, 1, false);  //输入1，那如果接收一个消息，但是没有应答，则客户端不会收到下一个消息

                                //    LogEvent("消费者", "Listening..."); 
                                QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);//在队列上定义一个消费者

                                channel.BasicConsume(topicName, false, consumer); //消费队列，并设置应答模式为程序主动应答
                                cache.State = true;
                                while (true)
                                {
                                    //阻塞函数，获取队列中的消息
                                    BasicDeliverEventArgs ea = consumer.Queue.Dequeue();
                                    byte[] bytes = ea.Body;
                                    string str = Encoding.UTF8.GetString(bytes);


                                    OnPublishMessageEvent(topicName, str);
                                    //                            LogEvent("消费者", "队列消息:" + str.ToString());
                                    //回复确认
                                    channel.BasicAck(ea.DeliveryTag, false);
                                    _heartbeatTime = DateTime.Now;
                                }
                            }
                        }
                        catch (Exception)
                        {
                            _networkState = false;
                        }
                    }

                    // ReSharper disable once FunctionNeverReturns
                });
            }
            catch (Exception)
            {
                return false;
            }
            return true;
        }

        public bool Subscribe(string topicName)
        {
            Cache temp;
            if (!_cache.ContainsKey(topicName))
            {
                temp = new Cache() { TopicName = topicName, State = false };
                _cache.Add(topicName, temp);
            }
            else
            {
                temp = _cache[topicName];//
            }
            if (!temp.State)//重复订阅
            {
                return Subscribe(topicName, temp);
            }
            return true;
        }


    }

    public class CommunManager
    {
        private readonly RabbitMqClient _commun;

        public CommunManager(ISerialize serialize)
        {
            _serialize = serialize;
            _commun = new RabbitMqClient(serialize);
            _commun.PublishMessageEvent += _commun_PublishMessageEvent;
        }

        private readonly ISerialize _serialize;
        public bool Push<T>(T message) where T : Message, new()
        {
            return _commun.Push(message.TopicName, message, _serialize);
        }

        public bool Subscribe<T>(T message) where T : Message, new()
        {
            return _commun.Subscribe(message.TopicName);
        }
        /// <summary>
        /// AVI实时生产消息
        /// </summary>
        public event Action<AviProductionMessage> AviMessageEvent;
        private void _commun_PublishMessageEvent(string topicName, string jsonStr)
        {
            //if (TopicName.Equals("/Test/" + _no))
            //{
            //    var message = _serialize.Deserialize<Test>(payload);
            //    TestEvent?.Invoke(message);
            //}
            //else if (TopicName.Equals("/Test2/"))
            //{
            //    var message = _serialize.Deserialize<Test2>(payload);
            //    // TestEvent?.Invoke(message);
            //}
            var message = _serialize.Deserialize<Message>(jsonStr);
            //  var message = jsonStr.ToObject<Message>(); // _serialize.Deserialize<Message>(payload);
            switch (message.Type)
            {

                //case MessageEnum.BoradCastClientMessage:
                //    BroadCastClientEvent?.Invoke(_serialize.Deserialize<BoradCastClientMessage>(payload));
                //    break;
                //case MessageEnum.SimulationMessage:
                //    SimulationMessage smMessage = _serialize.Deserialize<SimulationMessage>(payload);
                //    smMessage.TopicName = message.TopicName;
                //    SimulationEvent?.Invoke(smMessage);
                //    break;
                //case MessageEnum.KanbanData:
                //    KanbanData kanbanData = _serialize.Deserialize<KanbanData>(payload);
                //    KanbanEvent?.Invoke(kanbanData);
                //    break;
                //case MessageEnum.TvBoardMessage:
                //    TvBoardMessage bordMessage = _serialize.Deserialize<TvBoardMessage>(payload);
                //    TvBoardMessageEvent?.Invoke(bordMessage);
                //    break;
                case MessageEnum.AviProductionMessage:
                    // AviProductionMessage avimMessage = jsonStr.ToObject<AviProductionMessage>(); 
                    //_serialize.Deserialize<AviProductionMessage>(payload);
                    AviProductionMessage avimMessage = _serialize.Deserialize<AviProductionMessage>(jsonStr);
                    AviMessageEvent?.Invoke(avimMessage);
                    break;
                default:
                    throw new ArgumentOutOfRangeException();
            }
        }

    }
}
